graph TD
subgraph SingleShot["Single-Shot RAG"]
A["Complex Query"] --> B["Embed full query"]
B --> C["Retrieve top-k chunks"]
C --> D["Generate answer"]
D --> E["❌ Partial or wrong answer"]
end
subgraph Decomposed["Decomposed Retrieval"]
F["Complex Query"] --> G["Decompose into<br/>sub-questions"]
G --> H["Sub-Q1: Retrieve + Answer"]
G --> I["Sub-Q2: Retrieve + Answer<br/>(uses Sub-Q1 result)"]
G --> J["Sub-Q3: Retrieve + Answer<br/>(uses Sub-Q2 result)"]
J --> K["Compose final answer"]
K --> L["✅ Grounded multi-hop answer"]
end
style SingleShot fill:#fef2f2,stroke:#ef4444
style Decomposed fill:#f0fdf4,stroke:#22c55e
Planning and Query Decomposition for Complex Retrieval
Plan-and-execute agents, sub-question generation, and multi-hop retrieval over heterogeneous data sources
Keywords: planning, query decomposition, plan-and-execute, sub-question generation, multi-hop retrieval, multi-hop QA, plan-and-solve, self-ask, IRCoT, least-to-most, heterogeneous data sources, LangGraph, cognitive architecture, retrieval agent, compositionality gap, query routing, chain-of-thought retrieval

Introduction
Ask a retrieval agent “What is RAG?” and a single vector search + LLM call suffices. Ask it “How does the cost of hosting a fine-tuned Llama 3 model on vLLM compare to using GPT-4o through the API for a RAG pipeline processing 10,000 queries per day?” and the agent is stuck — no single retrieval can answer the question because the answer requires composing facts from multiple sources, in a specific order, with intermediate reasoning between retrievals.
This is the multi-hop retrieval problem. The user’s question isn’t atomic — it decomposes into sub-questions, each requiring its own retrieval from potentially different data sources, and the answer to one sub-question may determine what the next sub-question should be.
Research confirms the difficulty. Press et al. (2022) introduced the compositionality gap — measuring how often models can answer individual sub-questions correctly but fail to compose them into a correct multi-hop answer. They found that scaling model size improves single-hop recall faster than multi-hop composition, meaning bigger models don’t automatically solve the problem. What does solve it: explicit decomposition strategies like self-ask, where the model generates and answers follow-up questions before tackling the original.
The same insight applies to retrieval agents. Instead of throwing a complex query at a single retriever and hoping for the best, plan the retrieval: decompose the question into sub-questions, determine which data source each sub-question needs, retrieve answers sequentially or in parallel, and compose the final response from intermediate results.
This article implements three increasingly sophisticated planning strategies for retrieval agents:
- Plan-and-execute — generate a full plan upfront, execute each step with a retrieval agent, optionally re-plan based on intermediate results
- Sub-question decomposition — break complex queries into atomic sub-questions, retrieve and answer each independently, then synthesize
- Interleaved retrieval-reasoning (IRCoT) — alternate between chain-of-thought reasoning and retrieval, where each reasoning step determines the next retrieval
We build each pattern in LangGraph with working code, compare them on different query types, and show how to route sub-questions across heterogeneous data sources (vector stores, SQL databases, APIs, knowledge graphs).
Why Single-Shot Retrieval Fails for Complex Queries
The Compositionality Gap
Consider a multi-hop question: “Who is the CEO of the company that acquired the maker of ChatGPT?”
A human decomposes this naturally:
- Who made ChatGPT? → OpenAI
- Which company acquired OpenAI? → (This is a trick — OpenAI hasn’t been acquired, but Microsoft invested heavily)
- Who is the CEO of that company? → Satya Nadella
A standard RAG pipeline embeds the full question, retrieves chunks that happen to match some words, and often gets a garbled answer because no single document contains the full reasoning chain.
Types of Multi-Hop Complexity
Not all complex queries are the same. The planning strategy depends on the type of complexity:
| Complexity Type | Example | Planning Strategy |
|---|---|---|
| Sequential composition | “What year was the director of Inception born?” | Chain of dependent sub-questions |
| Parallel composition | “Compare salaries of data scientists in SF vs NYC” | Independent sub-questions, parallel retrieval |
| Conditional branching | “If the API has rate limits, calculate cost; otherwise, estimate latency” | Plan with decision points |
| Heterogeneous sources | “What do our docs say about X and what does the database show for Y?” | Route sub-questions to different retrievers |
| Iterative refinement | “Find the best framework — check benchmarks, community size, and docs quality” | Retrieve, evaluate, retrieve more |
A good planning agent handles all of these. Let’s build one.
Plan-and-Execute Agents
The Architecture
The Plan-and-Execute paradigm separates planning from execution. Instead of the ReAct loop (think → act → observe → think again), the agent first generates a complete plan, then executes each step:
graph TD
A["User Query"] --> B["Planner LLM"]
B --> C["Step 1: Search for X"]
B --> D["Step 2: Query DB for Y"]
B --> E["Step 3: Calculate Z"]
B --> F["Step 4: Compose answer"]
C --> G["Executor Agent"]
G --> H["Result 1"]
H --> D
D --> I["Executor Agent"]
I --> J["Result 2"]
J --> E
E --> K["Executor Agent"]
K --> L["Result 3"]
L --> F
F --> M["Executor Agent"]
M --> N["Final Answer"]
style B fill:#9b59b6,color:#fff,stroke:#333
style G fill:#e67e22,color:#fff,stroke:#333
style I fill:#e67e22,color:#fff,stroke:#333
style K fill:#e67e22,color:#fff,stroke:#333
style M fill:#e67e22,color:#fff,stroke:#333
Benefits over pure ReAct:
- The planner sees the full scope of the task before any execution happens
- The executor focuses on one step at a time — simpler tool selection, less context pollution
- The plan itself is inspectable and auditable before execution
- Different LLMs can be used for planning (powerful, expensive) vs. execution (efficient, cheap)
Tradeoffs:
- More LLM calls total (planning + execution per step)
- The initial plan may be wrong — requires re-planning capability
- Less adaptive than ReAct for tasks where the next step depends entirely on the previous result
Implementation in LangGraph
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langgraph.checkpoint.memory import MemorySaver
from langchain_openai import ChatOpenAI
import json
planner_llm = ChatOpenAI(model="gpt-4o", temperature=0)
executor_llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
class PlanExecuteState(TypedDict):
messages: Annotated[list, add_messages]
plan: list[str] # List of planned steps
current_step: int # Index of the step being executed
step_results: list[dict] # Results from executed steps
final_answer: str
def plan_step(state: PlanExecuteState) -> dict:
"""Generate a plan to answer the complex query."""
query = state["messages"][-1].content
response = planner_llm.invoke([
{"role": "system", "content": (
"You are a research planning expert. Given a complex question, "
"decompose it into a sequence of concrete retrieval and reasoning steps.\n\n"
"Rules:\n"
"- Each step should be a single, actionable task\n"
"- Steps can reference results from previous steps as 'result of step N'\n"
"- The final step should always be 'Compose the final answer from all results'\n"
"- Output ONLY a JSON array of step strings\n"
"- Use 3-7 steps maximum"
)},
{"role": "user", "content": f"Question: {query}"},
])
try:
plan = json.loads(response.content)
except json.JSONDecodeError:
# Fallback: treat the response as a single step
plan = [response.content.strip(), "Compose the final answer from all results"]
return {"plan": plan, "current_step": 0, "step_results": []}
def execute_step(state: PlanExecuteState) -> dict:
"""Execute the current step in the plan."""
step_idx = state["current_step"]
step = state["plan"][step_idx]
previous_results = state.get("step_results", [])
# Build context from previous step results
context = ""
if previous_results:
context = "Previous results:\n"
for i, result in enumerate(previous_results):
context += f"Step {i + 1}: {result['step']}\nResult: {result['result']}\n\n"
response = executor_llm.invoke([
{"role": "system", "content": (
"You are a research execution agent with access to retrieval tools. "
"Execute the given step using the provided context from previous steps. "
"Be factual and concise."
)},
{"role": "user", "content": f"{context}Current task: {step}"},
])
new_results = list(previous_results)
new_results.append({"step": step, "result": response.content})
return {"step_results": new_results, "current_step": step_idx + 1}
def should_continue(state: PlanExecuteState) -> str:
"""Check if there are more steps to execute."""
if state["current_step"] >= len(state["plan"]):
return "compose"
return "execute"
def compose_answer(state: PlanExecuteState) -> dict:
"""Compose the final answer from all step results."""
query = state["messages"][-1].content
results = state.get("step_results", [])
results_text = "\n\n".join(
f"**Step {i + 1}**: {r['step']}\n**Result**: {r['result']}"
for i, r in enumerate(results)
)
response = planner_llm.invoke([
{"role": "system", "content": (
"Compose a comprehensive answer to the original question "
"using the research results below. Cite which step each fact comes from."
)},
{"role": "user", "content": (
f"Original question: {query}\n\n"
f"Research results:\n{results_text}"
)},
])
return {
"messages": [{"role": "assistant", "content": response.content}],
"final_answer": response.content,
}
# Build the graph
graph = StateGraph(PlanExecuteState)
graph.add_node("plan", plan_step)
graph.add_node("execute", execute_step)
graph.add_node("compose", compose_answer)
graph.add_edge(START, "plan")
graph.add_edge("plan", "execute")
graph.add_conditional_edges("execute", should_continue, {
"execute": "execute",
"compose": "compose",
})
graph.add_edge("compose", END)
checkpointer = MemorySaver()
plan_execute_agent = graph.compile(checkpointer=checkpointer)Adding Re-Planning
The initial plan is a best guess. After each execution step, the agent may discover that the plan needs adjustment — a step returned unexpected results, a data source was unavailable, or new information changes the remaining steps.
class PlanExecuteWithReplanState(TypedDict):
messages: Annotated[list, add_messages]
plan: list[str]
current_step: int
step_results: list[dict]
replan_count: int
final_answer: str
def maybe_replan(state: PlanExecuteWithReplanState) -> dict:
"""Evaluate whether the plan needs adjustment after the latest step."""
if state["current_step"] >= len(state["plan"]):
return {} # Plan complete, no replanning needed
results = state.get("step_results", [])
remaining_steps = state["plan"][state["current_step"]:]
latest = results[-1] if results else None
response = planner_llm.invoke([
{"role": "system", "content": (
"You are evaluating whether a research plan needs adjustment.\n"
"Given the latest result and remaining steps, decide:\n"
"1. CONTINUE — the plan is still valid\n"
"2. REPLAN — output a revised list of remaining steps as JSON array\n\n"
"Output format: {\"action\": \"CONTINUE\"} or "
"{\"action\": \"REPLAN\", \"new_steps\": [...]}"
)},
{"role": "user", "content": (
f"Latest result: {latest}\n\n"
f"Remaining steps: {remaining_steps}"
)},
])
try:
decision = json.loads(response.content)
except json.JSONDecodeError:
return {}
if decision.get("action") == "REPLAN" and "new_steps" in decision:
completed_steps = state["plan"][:state["current_step"]]
new_plan = completed_steps + decision["new_steps"]
return {"plan": new_plan, "replan_count": state.get("replan_count", 0) + 1}
return {}
def should_continue_with_replan(state: PlanExecuteWithReplanState) -> str:
if state["current_step"] >= len(state["plan"]):
return "compose"
if state.get("replan_count", 0) > 3:
return "compose" # Safety: don't replan forever
return "replan"
# Build the graph with replanning
graph = StateGraph(PlanExecuteWithReplanState)
graph.add_node("plan", plan_step)
graph.add_node("execute", execute_step)
graph.add_node("replan", maybe_replan)
graph.add_node("compose", compose_answer)
graph.add_edge(START, "plan")
graph.add_edge("plan", "execute")
graph.add_conditional_edges("execute", should_continue_with_replan, {
"replan": "replan",
"compose": "compose",
})
graph.add_edge("replan", "execute")
graph.add_edge("compose", END)graph TD
A["User Query"] --> B["Plan"]
B --> C["Execute Step"]
C --> D{"More steps?"}
D -->|Yes| E["Re-Plan?"]
E -->|Plan OK| C
E -->|Adjusted| F["Updated Plan"]
F --> C
D -->|No| G["Compose Answer"]
G --> H["Final Response"]
style B fill:#9b59b6,color:#fff,stroke:#333
style C fill:#e67e22,color:#fff,stroke:#333
style E fill:#f5a623,color:#fff,stroke:#333
style G fill:#1abc9c,color:#fff,stroke:#333
Plan-and-Execute vs. ReAct
| Aspect | ReAct | Plan-and-Execute |
|---|---|---|
| Planning horizon | One step at a time | Full plan upfront |
| Adaptability | Highly adaptive — each step sees all history | Needs explicit re-planning |
| Context growth | Full history accumulates in prompt | Only step results, not full trace |
| Debuggability | Inspect each thought step | Inspect the plan itself |
| LLM calls | 1 per reasoning step | 1 for plan + 1 per step + 1 for composition |
| Best for | Simple tool routing, < 5 steps | Complex multi-step research, 5-15 steps |
Sub-Question Decomposition
The Pattern
Sub-question decomposition explicitly breaks a complex query into independent or dependent atomic questions, each answerable by a single retrieval:
graph TD
A["Complex Query:<br/>'Compare RAG costs: self-hosted vs API'"] --> B["Decomposer LLM"]
B --> C["SQ1: What are typical<br/>self-hosted RAG costs?"]
B --> D["SQ2: What are typical<br/>API-based RAG costs?"]
B --> E["SQ3: What are the<br/>hidden costs of each?"]
C --> F["Retrieve & Answer"]
D --> G["Retrieve & Answer"]
E --> H["Retrieve & Answer"]
F --> I["Synthesizer LLM"]
G --> I
H --> I
I --> J["Comprehensive<br/>Comparison"]
style B fill:#9b59b6,color:#fff,stroke:#333
style I fill:#1abc9c,color:#fff,stroke:#333
This is related to the self-ask method from Press et al. (2022) and the least-to-most prompting approach from Zhou et al. (2022). Both demonstrate that explicit sub-question generation dramatically improves multi-hop QA — self-ask improves accuracy by letting the model “ask itself” follow-up questions before answering, and plugging in a search engine to answer those follow-ups improves accuracy further.
Implementation: Parallel Sub-Question Decomposition
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langchain_openai import ChatOpenAI
import json
llm = ChatOpenAI(model="gpt-4o", temperature=0)
fast_llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
class SubQuestionState(TypedDict):
messages: Annotated[list, add_messages]
sub_questions: list[dict] # [{question, depends_on, source_hint}]
answers: dict # {question_id: answer}
final_answer: str
def decompose_query(state: SubQuestionState) -> dict:
"""Break the complex query into atomic sub-questions."""
query = state["messages"][-1].content
response = llm.invoke([
{"role": "system", "content": (
"Decompose this complex question into atomic sub-questions.\n\n"
"For each sub-question, specify:\n"
"- id: a unique identifier (sq1, sq2, ...)\n"
"- question: the sub-question text\n"
"- depends_on: list of sub-question IDs whose answers are needed "
" before this one can be answered (empty list if independent)\n"
"- source_hint: suggested data source "
" (vector_store, sql_database, api, knowledge_graph, web_search)\n\n"
"Output as JSON array. Order sub-questions so dependencies come first.\n"
"Generate 2-6 sub-questions maximum."
)},
{"role": "user", "content": query},
])
try:
sub_questions = json.loads(response.content)
except json.JSONDecodeError:
sub_questions = [{"id": "sq1", "question": query, "depends_on": [], "source_hint": "vector_store"}]
return {"sub_questions": sub_questions, "answers": {}}
def answer_sub_questions(state: SubQuestionState) -> dict:
"""Answer all sub-questions, respecting dependency order."""
sub_questions = state["sub_questions"]
answers = dict(state.get("answers", {}))
for sq in sub_questions:
sq_id = sq["id"]
if sq_id in answers:
continue
# Check if dependencies are met
deps = sq.get("depends_on", [])
if not all(d in answers for d in deps):
continue
# Build context from dependency answers
dep_context = ""
if deps:
dep_context = "Context from previous answers:\n"
for dep_id in deps:
dep_context += f"- {dep_id}: {answers[dep_id]}\n"
# Route to the appropriate retrieval source
source = sq.get("source_hint", "vector_store")
source_instruction = _get_source_instruction(source)
response = fast_llm.invoke([
{"role": "system", "content": (
f"Answer this sub-question using the specified data source.\n"
f"Source: {source}\n{source_instruction}\n"
"Be factual and concise. If you don't have enough information, "
"say so explicitly."
)},
{"role": "user", "content": f"{dep_context}\nQuestion: {sq['question']}"},
])
answers[sq_id] = response.content
return {"answers": answers}
def _get_source_instruction(source: str) -> str:
"""Return source-specific retrieval instructions."""
instructions = {
"vector_store": "Search the document knowledge base for relevant passages.",
"sql_database": "Query the structured database for exact data points.",
"api": "Call the appropriate API endpoint for real-time data.",
"knowledge_graph": "Traverse entity relationships in the knowledge graph.",
"web_search": "Search the web for up-to-date information.",
}
return instructions.get(source, "Use any available source.")
def check_all_answered(state: SubQuestionState) -> str:
"""Check if all sub-questions have been answered."""
answers = state.get("answers", {})
sub_questions = state.get("sub_questions", [])
if len(answers) >= len(sub_questions):
return "synthesize"
return "answer"
def synthesize_answer(state: SubQuestionState) -> dict:
"""Compose the final answer from all sub-question answers."""
query = state["messages"][-1].content
sub_questions = state["sub_questions"]
answers = state.get("answers", {})
qa_text = "\n\n".join(
f"**{sq['id']}**: {sq['question']}\n"
f"**Source**: {sq.get('source_hint', 'unknown')}\n"
f"**Answer**: {answers.get(sq['id'], 'Not answered')}"
for sq in sub_questions
)
response = llm.invoke([
{"role": "system", "content": (
"Synthesize a comprehensive answer from the sub-question results below. "
"Integrate all facts into a coherent response. "
"Note which source each piece of information came from."
)},
{"role": "user", "content": (
f"Original question: {query}\n\n"
f"Sub-question results:\n{qa_text}"
)},
])
return {
"messages": [{"role": "assistant", "content": response.content}],
"final_answer": response.content,
}
# Build the graph
graph = StateGraph(SubQuestionState)
graph.add_node("decompose", decompose_query)
graph.add_node("answer", answer_sub_questions)
graph.add_node("synthesize", synthesize_answer)
graph.add_edge(START, "decompose")
graph.add_edge("decompose", "answer")
graph.add_conditional_edges("answer", check_all_answered, {
"answer": "answer",
"synthesize": "synthesize",
})
graph.add_edge("synthesize", END)
sub_question_agent = graph.compile()Dependency-Aware Execution
The key insight is the depends_on field. Some sub-questions are independent (can be retrieved in parallel), while others depend on prior answers:
graph TD
Q["What is the performance gap between<br/>fine-tuned Llama 3 and GPT-4o for<br/>RAG pipelines, and what drives the cost?"]
Q --> SQ1["SQ1: What is Llama 3's typical<br/>RAG accuracy?<br/>depends_on: []"]
Q --> SQ2["SQ2: What is GPT-4o's typical<br/>RAG accuracy?<br/>depends_on: []"]
Q --> SQ3["SQ3: What does fine-tuning<br/>Llama 3 cost?<br/>depends_on: []"]
Q --> SQ4["SQ4: What is the performance gap?<br/>depends_on: [sq1, sq2]"]
Q --> SQ5["SQ5: What drives the cost difference?<br/>depends_on: [sq3, sq2]"]
SQ1 --> SQ4
SQ2 --> SQ4
SQ2 --> SQ5
SQ3 --> SQ5
style SQ1 fill:#56cc9d,stroke:#333,color:#fff
style SQ2 fill:#56cc9d,stroke:#333,color:#fff
style SQ3 fill:#56cc9d,stroke:#333,color:#fff
style SQ4 fill:#6cc3d5,stroke:#333,color:#fff
style SQ5 fill:#6cc3d5,stroke:#333,color:#fff
SQ1, SQ2, and SQ3 have no dependencies — they can be answered in parallel. SQ4 and SQ5 depend on prior answers and must wait. The answer_sub_questions node iterates until all questions are resolved, handling this dependency ordering naturally.
Interleaved Retrieval-Reasoning (IRCoT)
Why Interleaving Matters
Both plan-and-execute and sub-question decomposition generate the full decomposition before any retrieval happens. This works well when the question structure is clear upfront, but fails when what to retrieve next depends on what was just retrieved.
IRCoT (Trivedi et al., 2023) addresses this by interleaving retrieval with chain-of-thought reasoning — each reasoning step generates a retrieval query, and each retrieval result informs the next reasoning step. On HotpotQA, 2WikiMultihopQA, MuSiQue, and IIRC, IRCoT improved retrieval recall by up to 21 points and downstream QA accuracy by up to 15 points over one-shot retrieval.
graph TD
subgraph OneShot["One-Shot Retrieval"]
A1["Query"] --> A2["Retrieve All"] --> A3["Reason"] --> A4["Answer"]
end
subgraph IRCoT["Interleaved Retrieval-Reasoning"]
B1["Query"] --> B2["Reason Step 1"]
B2 --> B3["Retrieve for Step 1"]
B3 --> B4["Reason Step 2<br/>(uses Step 1 result)"]
B4 --> B5["Retrieve for Step 2"]
B5 --> B6["Reason Step 3"]
B6 --> B7["Answer"]
end
style OneShot fill:#fef2f2,stroke:#ef4444
style IRCoT fill:#f0fdf4,stroke:#22c55e
The crucial difference: in IRCoT, the retrieval query at step N is informed by the reasoning and retrieval results from steps 1 through N-1. This means the agent dynamically adjusts what it searches for based on what it has already learned.
Implementation: IRCoT Agent
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4o", temperature=0)
class IRCoTState(TypedDict):
messages: Annotated[list, add_messages]
original_query: str
reasoning_chain: list[dict] # [{thought, query, retrieved, conclusion}]
iteration: int
final_answer: str
def reason_and_query(state: IRCoTState) -> dict:
"""Generate the next reasoning step and retrieval query."""
query = state.get("original_query", state["messages"][-1].content)
chain = state.get("reasoning_chain", [])
iteration = state.get("iteration", 0)
# Build chain-of-thought context
cot_context = ""
if chain:
cot_context = "Reasoning so far:\n"
for i, step in enumerate(chain, 1):
cot_context += f"\nStep {i}:\n"
cot_context += f" Thought: {step['thought']}\n"
cot_context += f" Retrieved: {step['retrieved'][:200]}\n"
cot_context += f" Conclusion: {step['conclusion']}\n"
response = llm.invoke([
{"role": "system", "content": (
"You are reasoning step-by-step to answer a complex question.\n"
"Based on what you know so far, generate:\n"
"1. THOUGHT: What do you still need to find out?\n"
"2. QUERY: A specific search query to retrieve the needed information\n\n"
"If you have enough information to answer, instead output:\n"
"THOUGHT: I have enough information.\n"
"ANSWER: <your final answer>\n\n"
"Output format (strict):\n"
"THOUGHT: <reasoning>\n"
"QUERY: <search query>\n"
"OR\n"
"THOUGHT: <reasoning>\n"
"ANSWER: <final answer>"
)},
{"role": "user", "content": (
f"Original question: {query}\n\n{cot_context}"
)},
])
text = response.content.strip()
# Parse the response
if "ANSWER:" in text:
answer_part = text.split("ANSWER:", 1)[1].strip()
return {
"final_answer": answer_part,
"iteration": iteration + 1,
}
thought = ""
search_query = ""
if "THOUGHT:" in text:
thought = text.split("THOUGHT:", 1)[1].split("QUERY:", 1)[0].strip()
if "QUERY:" in text:
search_query = text.split("QUERY:", 1)[1].strip()
# Store the partial reasoning step (retrieval will fill in the rest)
new_chain = list(chain)
new_chain.append({
"thought": thought,
"query": search_query,
"retrieved": "", # Will be filled by retrieve step
"conclusion": "", # Will be filled by conclude step
})
return {
"original_query": query,
"reasoning_chain": new_chain,
"iteration": iteration + 1,
}
def retrieve_for_step(state: IRCoTState) -> dict:
"""Retrieve documents for the current reasoning step's query."""
chain = state.get("reasoning_chain", [])
if not chain:
return {}
current_step = chain[-1]
search_query = current_step.get("query", "")
if not search_query:
return {}
# Retrieve from vector store (replace with your actual retriever)
docs = vectorstore.similarity_search(search_query, k=3)
retrieved_text = "\n".join(d.page_content[:300] for d in docs)
# Update the current step with retrieved content
updated_chain = list(chain)
updated_chain[-1] = {**current_step, "retrieved": retrieved_text}
return {"reasoning_chain": updated_chain}
def conclude_step(state: IRCoTState) -> dict:
"""Draw a conclusion from the retrieved information for this step."""
chain = state.get("reasoning_chain", [])
if not chain:
return {}
current_step = chain[-1]
response = llm.invoke([
{"role": "system", "content": (
"Based on the thought and retrieved information, "
"write a brief factual conclusion. One or two sentences."
)},
{"role": "user", "content": (
f"Thought: {current_step['thought']}\n"
f"Retrieved: {current_step['retrieved']}"
)},
])
updated_chain = list(chain)
updated_chain[-1] = {**current_step, "conclusion": response.content}
return {"reasoning_chain": updated_chain}
def should_continue_ircot(state: IRCoTState) -> str:
"""Check if the agent has produced a final answer or hit limits."""
if state.get("final_answer"):
return "done"
if state.get("iteration", 0) >= 6:
return "force_answer"
return "retrieve"
def force_answer(state: IRCoTState) -> dict:
"""Force a final answer from whatever reasoning has been gathered."""
query = state.get("original_query", "")
chain = state.get("reasoning_chain", [])
chain_text = "\n".join(
f"- {step['conclusion']}" for step in chain if step.get("conclusion")
)
response = llm.invoke([
{"role": "system", "content": "Answer the question using only the facts below."},
{"role": "user", "content": f"Question: {query}\n\nFacts:\n{chain_text}"},
])
return {
"messages": [{"role": "assistant", "content": response.content}],
"final_answer": response.content,
}
def format_answer(state: IRCoTState) -> dict:
"""Format the final answer as a message."""
return {
"messages": [{"role": "assistant", "content": state["final_answer"]}],
}
# Build the IRCoT graph
graph = StateGraph(IRCoTState)
graph.add_node("reason", reason_and_query)
graph.add_node("retrieve", retrieve_for_step)
graph.add_node("conclude", conclude_step)
graph.add_node("force_answer", force_answer)
graph.add_node("format_answer", format_answer)
graph.add_edge(START, "reason")
graph.add_conditional_edges("reason", should_continue_ircot, {
"retrieve": "retrieve",
"done": "format_answer",
"force_answer": "force_answer",
})
graph.add_edge("retrieve", "conclude")
graph.add_edge("conclude", "reason")
graph.add_edge("force_answer", END)
graph.add_edge("format_answer", END)
ircot_agent = graph.compile()When to Use IRCoT
IRCoT excels when the sub-questions aren’t known upfront — when each retrieval step reveals what needs to be retrieved next. This is common in:
- Exploratory research — “What factors contributed to X?” (you don’t know the factors until you start retrieving)
- Chain reasoning — “Who mentored the person who invented Y?” (each hop depends on the previous)
- Conditional retrieval — “If the system uses X architecture, check for A; otherwise check for B”
Multi-Hop Retrieval over Heterogeneous Sources
The Routing Problem
Real-world retrieval agents don’t query a single vector store. They need to route sub-questions to the right data source:
| Data Source | Best For | Example Query |
|---|---|---|
| Vector store | Semantic search over documents | “What are best practices for chunking?” |
| SQL database | Exact counts, aggregations, structured data | “How many users signed up last month?” |
| REST API | Real-time data, external services | “What is the current price of X?” |
| Knowledge graph | Entity relationships, traversals | “Who reports to the VP of Engineering?” |
| Web search | Up-to-date information not in local sources | “What was announced at the latest conference?” |
Source-Aware Query Router
from typing import TypedDict, Annotated, Literal
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langchain_openai import ChatOpenAI
import json
llm = ChatOpenAI(model="gpt-4o", temperature=0)
class MultiSourceState(TypedDict):
messages: Annotated[list, add_messages]
sub_questions: list[dict]
routed_questions: list[dict] # [{question, source, answer}]
final_answer: str
def decompose_and_route(state: MultiSourceState) -> dict:
"""Decompose the query and assign each sub-question to a data source."""
query = state["messages"][-1].content
response = llm.invoke([
{"role": "system", "content": (
"Decompose this question into sub-questions and route each to the "
"best data source.\n\n"
"Available sources:\n"
"- vector_store: technical documentation, articles, guides\n"
"- sql_database: user data, metrics, transaction records\n"
"- api: real-time pricing, stock data, weather\n"
"- knowledge_graph: entity relationships, org charts, taxonomies\n"
"- web_search: recent news, announcements, current events\n\n"
"Output JSON array: [{\"id\": \"sq1\", \"question\": \"...\", "
"\"source\": \"...\", \"depends_on\": []}]\n"
"Generate 2-5 sub-questions."
)},
{"role": "user", "content": query},
])
try:
sub_questions = json.loads(response.content)
except json.JSONDecodeError:
sub_questions = [{"id": "sq1", "question": query, "source": "vector_store", "depends_on": []}]
return {"sub_questions": sub_questions, "routed_questions": []}
def execute_routed_queries(state: MultiSourceState) -> dict:
"""Execute each sub-question against its assigned data source."""
sub_questions = state["sub_questions"]
answered = {rq["id"]: rq for rq in state.get("routed_questions", [])}
new_answered = list(state.get("routed_questions", []))
for sq in sub_questions:
if sq["id"] in answered:
continue
# Check dependencies
deps = sq.get("depends_on", [])
if not all(d in answered for d in deps):
continue
# Gather dependency context
dep_context = ""
for dep_id in deps:
dep_context += f"{dep_id}: {answered[dep_id]['answer']}\n"
# Route to the appropriate retrieval function
source = sq["source"]
answer = _retrieve_from_source(source, sq["question"], dep_context)
result = {**sq, "answer": answer}
new_answered.append(result)
answered[sq["id"]] = result
return {"routed_questions": new_answered}
def _retrieve_from_source(source: str, question: str, context: str) -> str:
"""Route retrieval to the appropriate data source."""
if source == "vector_store":
docs = vectorstore.similarity_search(question, k=3)
doc_text = "\n".join(d.page_content[:300] for d in docs)
return _answer_from_context(question, doc_text, context)
elif source == "sql_database":
# Generate and execute SQL
sql = _generate_sql(question, context)
result = db.execute(sql)
return f"SQL result: {result}"
elif source == "api":
# Call the appropriate API
return _call_api(question, context)
elif source == "knowledge_graph":
# Query the knowledge graph
cypher = _generate_cypher(question, context)
result = graph_db.query(cypher)
return f"Graph result: {result}"
elif source == "web_search":
# Web search fallback
results = web_search(question)
return _answer_from_context(question, results, context)
return "Source not available"
def _answer_from_context(question: str, retrieved: str, dep_context: str) -> str:
"""Answer a sub-question from retrieved context."""
response = ChatOpenAI(model="gpt-4o-mini", temperature=0).invoke([
{"role": "system", "content": "Answer the question using ONLY the provided context."},
{"role": "user", "content": (
f"Context from previous steps: {dep_context}\n\n"
f"Retrieved documents:\n{retrieved}\n\n"
f"Question: {question}"
)},
])
return response.content
def _generate_sql(question: str, context: str) -> str:
"""Generate a SQL query from a natural language question."""
response = ChatOpenAI(model="gpt-4o-mini", temperature=0).invoke([
{"role": "system", "content": (
"Generate a SQL query to answer this question. "
"Return ONLY the SQL, no explanation. "
"Use standard PostgreSQL syntax."
)},
{"role": "user", "content": f"Context: {context}\nQuestion: {question}"},
])
return response.content
def _generate_cypher(question: str, context: str) -> str:
"""Generate a Cypher query for Neo4j from a natural language question."""
response = ChatOpenAI(model="gpt-4o-mini", temperature=0).invoke([
{"role": "system", "content": (
"Generate a Cypher query to answer this question. "
"Return ONLY the Cypher query, no explanation."
)},
{"role": "user", "content": f"Context: {context}\nQuestion: {question}"},
])
return response.content
def check_all_routed(state: MultiSourceState) -> str:
answered = {rq["id"] for rq in state.get("routed_questions", [])}
all_ids = {sq["id"] for sq in state.get("sub_questions", [])}
if answered >= all_ids:
return "synthesize"
return "execute"
def synthesize_multi_source(state: MultiSourceState) -> dict:
"""Synthesize the final answer from multi-source results."""
query = state["messages"][-1].content
results = state.get("routed_questions", [])
results_text = "\n\n".join(
f"**{r['id']}** ({r['source']}): {r['question']}\n→ {r['answer']}"
for r in results
)
response = llm.invoke([
{"role": "system", "content": (
"Synthesize a comprehensive answer from results gathered across "
"multiple data sources. Note which source each fact comes from."
)},
{"role": "user", "content": f"Question: {query}\n\nResults:\n{results_text}"},
])
return {
"messages": [{"role": "assistant", "content": response.content}],
"final_answer": response.content,
}
# Build the multi-source graph
graph = StateGraph(MultiSourceState)
graph.add_node("decompose_and_route", decompose_and_route)
graph.add_node("execute", execute_routed_queries)
graph.add_node("synthesize", synthesize_multi_source)
graph.add_edge(START, "decompose_and_route")
graph.add_edge("decompose_and_route", "execute")
graph.add_conditional_edges("execute", check_all_routed, {
"execute": "execute",
"synthesize": "synthesize",
})
graph.add_edge("synthesize", END)
multi_source_agent = graph.compile()Source Selection Matrix
How should the router decide which source to use? The decomposer LLM handles this via instructions, but here’s the mental model:
graph TD
A["Sub-Question"] --> B{"Question Type?"}
B -->|"Conceptual / How-to"| C["Vector Store<br/>(semantic search)"]
B -->|"Quantitative / Exact"| D["SQL Database<br/>(structured query)"]
B -->|"Real-time / External"| E["API / Web Search<br/>(live data)"]
B -->|"Relationship / Graph"| F["Knowledge Graph<br/>(traversal)"]
B -->|"Comparative"| G["Multiple Sources<br/>(fan-out)"]
style B fill:#f5a623,color:#fff,stroke:#333
style C fill:#56cc9d,stroke:#333,color:#fff
style D fill:#6cc3d5,stroke:#333,color:#fff
style E fill:#ff7851,stroke:#333,color:#fff
style F fill:#9b59b6,stroke:#333,color:#fff
style G fill:#e67e22,stroke:#333,color:#fff
Comparing Planning Strategies
Strategy Selection Guide
| Strategy | Pre-plans | Adapts Mid-Execution | Parallel Sub-Qs | Multi-Source | Best For |
|---|---|---|---|---|---|
| ReAct (baseline) | No | Yes (every step) | No | Via tool routing | Simple 1-5 step tasks |
| Plan-and-Execute | Full plan | Via re-planning | Sequential | Manual routing | Structured multi-step research |
| Sub-Question Decomposition | Decomposition | Limited | Yes (independent sub-Qs) | Per sub-question | Parallel fact-gathering |
| IRCoT | No (step-by-step) | Yes (each step) | No | Per retrieval step | Chain reasoning, exploration |
| Multi-Source Router | Decomposition + routing | Limited | Yes (same-level sub-Qs) | Native | Heterogeneous data landscapes |
Decision Tree
graph TD
A["Complex query?"] -->|No| B["Use ReAct"]
A -->|Yes| C{"Sub-questions<br/>known upfront?"}
C -->|Yes| D{"Independent<br/>or dependent?"}
C -->|No| E["Use IRCoT<br/>(interleaved)"]
D -->|Mostly independent| F{"Multiple<br/>data sources?"}
D -->|Sequential chain| G["Use Plan-and-Execute<br/>(with re-planning)"]
F -->|Yes| H["Use Multi-Source<br/>Router"]
F -->|No| I["Use Sub-Question<br/>Decomposition"]
style B fill:#56cc9d,stroke:#333,color:#fff
style E fill:#9b59b6,stroke:#333,color:#fff
style G fill:#e67e22,stroke:#333,color:#fff
style H fill:#6cc3d5,stroke:#333,color:#fff
style I fill:#ff7851,stroke:#333,color:#fff
Example Query Routing
| Query | Strategy | Why |
|---|---|---|
| “What is RAG?” | ReAct (single retrieval) | Atomic question, one source |
| “Compare chunking strategies: fixed vs. semantic vs. agentic” | Sub-Question Decomposition | Three independent sub-questions |
| “Who is the CTO of the company that developed the model beating GPT-4 on MMLU?” | IRCoT | Each hop depends on the previous |
| “Summarize our Q3 revenue from the database and compare with industry trends from reports” | Multi-Source Router | SQL for revenue + vector store for reports |
| “Research the top 5 vector databases, benchmark their performance, and recommend one for our use case” | Plan-and-Execute | Multi-step research with defined phases |
Domain-Specific Cognitive Architectures
Why General Planning Isn’t Enough
Harrison Chase’s “Planning for Agents” argues that nearly all production agents use domain-specific cognitive architectures — not general-purpose planning. General approaches like plan-and-solve show improvement on benchmarks, but production systems need architectures tailored to their specific task.
The insight: rather than asking the LLM to plan, encode the planning logic in code. The LLM handles reasoning and tool selection within each step, but the flow between steps is deterministic and purpose-built.
Example: Research Report Pipeline
A domain-specific architecture for generating research reports over multiple data sources:
class ResearchReportState(TypedDict):
messages: Annotated[list, add_messages]
topic: str
# Phase 1: Scope
scope_questions: list[str]
# Phase 2: Gather
source_results: dict # {source_name: [results]}
# Phase 3: Analyze
key_findings: list[str]
contradictions: list[str]
gaps: list[str]
# Phase 4: Synthesize
report: str
def scope_topic(state: ResearchReportState) -> dict:
"""Phase 1: Define the research scope and generate targeted questions."""
topic = state["messages"][-1].content
response = llm.invoke([
{"role": "system", "content": (
"You are scoping a research report. Generate 3-5 specific, "
"retrievable questions that together cover the topic comprehensively. "
"Output as JSON array of strings."
)},
{"role": "user", "content": f"Topic: {topic}"},
])
try:
questions = json.loads(response.content)
except json.JSONDecodeError:
questions = [topic]
return {"topic": topic, "scope_questions": questions}
def gather_from_docs(state: ResearchReportState) -> dict:
"""Phase 2a: Retrieve from document store."""
results = dict(state.get("source_results", {}))
doc_results = []
for question in state.get("scope_questions", []):
docs = vectorstore.similarity_search(question, k=3)
for doc in docs:
doc_results.append({
"question": question,
"content": doc.page_content,
"source": doc.metadata.get("source", "docs"),
})
results["documents"] = doc_results
return {"source_results": results}
def gather_from_database(state: ResearchReportState) -> dict:
"""Phase 2b: Query structured data."""
results = dict(state.get("source_results", {}))
db_results = []
for question in state.get("scope_questions", []):
response = fast_llm.invoke([
{"role": "system", "content": (
"If this question can be answered with a SQL query, generate the SQL. "
"If not, respond with 'SKIP'. Return ONLY the SQL or 'SKIP'."
)},
{"role": "user", "content": question},
])
if "SKIP" not in response.content.upper():
# Execute SQL query (replace with actual DB)
db_results.append({
"question": question,
"sql": response.content,
"source": "database",
})
results["database"] = db_results
return {"source_results": results}
def analyze_findings(state: ResearchReportState) -> dict:
"""Phase 3: Analyze all gathered results for insights and contradictions."""
all_results = state.get("source_results", {})
flat_results = []
for source_name, items in all_results.items():
for item in items:
flat_results.append(f"[{source_name}] {item.get('content', item)}")
results_text = "\n\n".join(flat_results[:20]) # Limit context
response = llm.invoke([
{"role": "system", "content": (
"Analyze these research results. Identify:\n"
"1. key_findings: Main facts and insights (JSON array)\n"
"2. contradictions: Conflicting information (JSON array)\n"
"3. gaps: What's missing or needs more research (JSON array)\n\n"
"Output as JSON: {\"key_findings\": [...], \"contradictions\": [...], \"gaps\": [...]}"
)},
{"role": "user", "content": f"Topic: {state['topic']}\n\nResults:\n{results_text}"},
])
try:
analysis = json.loads(response.content)
except json.JSONDecodeError:
analysis = {"key_findings": [], "contradictions": [], "gaps": []}
return {
"key_findings": analysis.get("key_findings", []),
"contradictions": analysis.get("contradictions", []),
"gaps": analysis.get("gaps", []),
}
def synthesize_report(state: ResearchReportState) -> dict:
"""Phase 4: Generate the final research report."""
response = llm.invoke([
{"role": "system", "content": (
"Write a concise research report based on the analysis below. "
"Structure: Overview, Key Findings, Potential Issues, Conclusions. "
"Cite sources where possible."
)},
{"role": "user", "content": (
f"Topic: {state['topic']}\n\n"
f"Key Findings:\n" + "\n".join(f"- {f}" for f in state.get("key_findings", [])) + "\n\n"
f"Contradictions:\n" + "\n".join(f"- {c}" for c in state.get("contradictions", [])) + "\n\n"
f"Gaps:\n" + "\n".join(f"- {g}" for g in state.get("gaps", []))
)},
])
return {
"messages": [{"role": "assistant", "content": response.content}],
"report": response.content,
}
# Build the domain-specific pipeline
graph = StateGraph(ResearchReportState)
graph.add_node("scope", scope_topic)
graph.add_node("gather_docs", gather_from_docs)
graph.add_node("gather_db", gather_from_database)
graph.add_node("analyze", analyze_findings)
graph.add_node("synthesize", synthesize_report)
graph.add_edge(START, "scope")
graph.add_edge("scope", "gather_docs")
graph.add_edge("gather_docs", "gather_db")
graph.add_edge("gather_db", "analyze")
graph.add_edge("analyze", "synthesize")
graph.add_edge("synthesize", END)
research_pipeline = graph.compile()The flow is not planned by the LLM — it’s hardcoded as Scope → Gather → Analyze → Synthesize. The LLM handles reasoning within each step, but the transitions between steps are deterministic. This is what Harrison Chase calls “domain-specific cognitive architectures” — the most reliable pattern for production agents.
Common Pitfalls and How to Fix Them
| Pitfall | Symptom | Fix |
|---|---|---|
| Over-decomposition | Simple questions split into 5+ sub-questions | Check query complexity first — route simple queries directly to retrieval |
| Plan never executes | Planner generates vague or impossible steps | Add step validation — each step must reference a concrete tool or source |
| Cascading errors | Wrong answer in step 1 corrupts all later steps | Add verification after each step; implement re-planning on low confidence |
| Source mismatch | Sub-question routed to wrong data source | Improve routing prompt with source descriptions and examples |
| Context explosion | All intermediate results accumulate in prompt | Summarize step results before passing to next step |
| Infinite re-planning | Re-plan triggers after every step | Cap re-plans (max 2-3); only re-plan when step results contradict the plan |
| Parallel fetch bottleneck | Independent sub-questions answered sequentially | Use async execution or LangGraph’s parallel node execution |
| No stopping condition | IRCoT loops forever on open-ended queries | Set max iterations (5-8); force answer when limit is reached |
Conclusion
Complex retrieval questions require planning — no single-shot retrieval can compose facts from multiple sources, resolve multi-hop dependencies, or route sub-questions to heterogeneous data stores. The planning strategy you choose depends on the nature of the complexity.
Key takeaways:
- Single-shot RAG fails for multi-hop questions. The compositionality gap means models that answer simple questions well still fail to compose multiple facts. Explicit decomposition is the fix.
- Plan-and-execute separates planning from execution. The planner generates a full step-by-step plan; the executor handles one step at a time. Add re-planning for adaptability. Best for structured, multi-phase research tasks.
- Sub-question decomposition breaks queries into atomic questions with dependency tracking. Independent sub-questions can run in parallel. Each can be routed to a different data source. Best for parallel fact-gathering across known dimensions.
- IRCoT (interleaved retrieval-reasoning) alternates between chain-of-thought reasoning and retrieval. What to retrieve next is determined by what was just learned. Best for exploratory or chain-dependent questions where the decomposition isn’t known upfront.
- Heterogeneous source routing assigns each sub-question to the right backend — vector store for concepts, SQL for metrics, APIs for real-time data, knowledge graphs for relationships. The decomposer doubles as a router.
- Domain-specific cognitive architectures beat general-purpose planning for production systems. Encode the task flow in code (Scope → Gather → Analyze → Synthesize) and let the LLM handle reasoning within each step. This is how reliable agents are actually built.
Start with sub-question decomposition for most multi-hop retrieval tasks. Move to plan-and-execute when the task has clearly defined phases. Use IRCoT for exploratory chain reasoning. And when building for production, design a domain-specific pipeline where the flow is deterministic and the LLM handles step-level intelligence.
References
- Wang et al., Plan-and-Solve Prompting: Improving Zero-Shot Chain-of-Thought Reasoning by Large Language Models, ACL 2023.
- Press et al., Measuring and Narrowing the Compositionality Gap in Language Models, Findings of EMNLP 2023 — introduced the self-ask method and compositionality gap metric.
- Trivedi et al., Interleaving Retrieval with Chain-of-Thought Reasoning for Knowledge-Intensive Multi-Step Questions, ACL 2023 — introduced IRCoT.
- Zhou et al., Least-to-Most Prompting Enables Complex Reasoning in Large Language Models, ICLR 2023 — progressive sub-question decomposition from simple to complex.
- Yao et al., ReAct: Synergizing Reasoning and Acting in Language Models, ICLR 2023 — the baseline reasoning-acting loop.
- Harrison Chase, Planning for Agents, LangChain Blog, 2024 — domain-specific cognitive architectures vs. general-purpose planning.
- LangChain, Plan-and-Execute Agents, LangChain Blog, 2023 — the plan-and-execute agent paradigm.
- Nakajima, BabyAGI, 2023 — task-driven autonomous agent with planning and vectorstore-backed memory.
- CodiumAI, AlphaCodium: Code Generation with AlphaCode-Level Performance, 2024 — flow engineering as domain-specific cognitive architecture.
Read More
- Implement the agent loop that executes each plan step with Building a ReAct Agent from Scratch — the Thought-Action-Observation cycle used inside each executor.
- Build the state machines that orchestrate plan-and-execute flows with Building Agents with LangGraph — StateGraph, conditional routing, checkpointers, and subgraphs.
- Distribute sub-questions across specialized retrieval agents with Multi-Agent RAG Orchestration Patterns — supervisor and hierarchical topologies.
- Give planning agents persistent memory across sessions with Memory Systems for Long-Running Retrieval Agents — scratchpads, episodic recall, and cross-agent sharing.
- Connect plan steps to retrieval tools with Tool Use and Function Calling for Retrieval Agents — function calling, MCP, and dynamic tool selection.
- Build the retrieval pipelines sub-questions query with Building a RAG Pipeline from Scratch.
- Add self-correcting retrieval within plan steps with Hybrid and Corrective RAG Architectures.
- Route sub-questions to knowledge graphs with GraphRAG: Knowledge Graphs Meet Retrieval-Augmented Generation.
- Monitor multi-step agent execution with Observability for Multi-Turn LLM Conversations.